import numpy as np
import pandas as pd
import soundfile as sf
import scipy.spatial
import matplotlib.pyplot as plt
import IPython.display as ipd
from audio2numpy import open_audio
from numba import jit
import librosa, librosa.display
import IPython
%matplotlib inline
Fs = 44100
X_mp3, Fs = librosa.load('./Cruel Angels Thesis From MIDI.mp3', sr=Fs)
Y_mp3, Fs = librosa.load('./Cruel Angels Thesis From YouTube.mp3', sr=Fs)
N = 4410
H = 2205
X_stft = np.abs(librosa.stft(X_mp3))
Y_stft = np.abs(librosa.stft(Y_mp3))
X_chroma = librosa.feature.chroma_stft(y=X_mp3, sr=Fs)
Y_chroma = librosa.feature.chroma_stft(y=Y_mp3, sr=Fs)
plt.figure(figsize=(8, 2))
plt.title('Sequence $X$')
librosa.display.specshow(X_chroma, x_axis='frames', y_axis='chroma', cmap='inferno', hop_length=H)
plt.xlabel('Time (frames)')
plt.ylabel('Chroma')
plt.colorbar()
plt.clim([0, 1])
plt.tight_layout(); plt.show()
plt.figure(figsize=(8, 2))
plt.title('Sequence $Y$')
librosa.display.specshow(Y_chroma, x_axis='frames', y_axis='chroma', cmap='inferno', hop_length=H)
plt.xlabel('Time (frames)')
plt.ylabel('Chroma')
plt.colorbar()
plt.clim([0, 1])
plt.tight_layout(); plt.show()
print(X_chroma.shape)
print(Y_chroma.shape)
(12, 21130) (12, 21111)
def ensure_same_shape(array1, array2):
array1_rows = array1.shape[1]
array2_rows = array2.shape[1]
difference = array1_rows - array2_rows
if(difference > 0):
array2_padded = np.pad(array2, [(0, 0), (0, difference)], mode='constant')
return(array1, array2_padded)
elif(difference < 0):
array1_padded = np.pad(array1, [(0, 0), (0, abs(difference))], mode='constant')
return(array1_padded, array2)
else:
return(array1, array2)
X_chroma_standardized, Y_chroma_standardized = ensure_same_shape(X_chroma, Y_chroma)
print(X_chroma_standardized.shape)
print(Y_chroma_standardized.shape)
(12, 21130) (12, 21130)
def compute_cost_matrix(X, Y, metric='euclidean'):
"""Compute the cost matrix of two feature sequences
Notebook: C3/C3S2_DTWbasic.ipynb
Args:
X (np.ndarray): Sequence 1
Y (np.ndarray): Sequence 2
metric (str): Cost metric, a valid strings for scipy.spatial.distance.cdist (Default value = 'euclidean')
Returns:
C (np.ndarray): Cost matrix
"""
X, Y = np.atleast_2d(X_chroma_standardized, Y_chroma_standardized)
C = scipy.spatial.distance.cdist(X.T, Y.T, metric=metric)
return C
C = compute_cost_matrix(X_mp3, Y_mp3, metric='euclidean')
print('Cost matrix C =', C, sep='\n')
Cost matrix C = [[1.46373697 1.46373697 1.46373697 ... 1.46373697 1.46373697 1.46373697] [1.60108684 1.60108684 1.60108684 ... 1.60108684 1.60108684 1.60108684] [2.29136799 2.29136799 2.29136799 ... 2.29136799 2.29136799 2.29136799] ... [2.53758825 2.53758825 2.53758825 ... 2.53758825 2.53758825 2.53758825] [2.79364288 2.79364288 2.79364288 ... 2.79364288 2.79364288 2.79364288] [2.86072472 2.86072472 2.86072472 ... 2.86072472 2.86072472 2.86072472]]
@jit(nopython=True)
def compute_accumulated_cost_matrix(C):
"""Compute the accumulated cost matrix given the cost matrix
Notebook: C3/C3S2_DTWbasic.ipynb
Args:
C (np.ndarray): Cost matrix
Returns:
D (np.ndarray): Accumulated cost matrix
"""
N = C.shape[0]
M = C.shape[1]
D = np.zeros((N, M))
D[0, 0] = C[0, 0]
for n in range(1, N):
D[n, 0] = D[n-1, 0] + C[n, 0]
for m in range(1, M):
D[0, m] = D[0, m-1] + C[0, m]
for n in range(1, N):
for m in range(1, M):
D[n, m] = C[n, m] + min(D[n-1, m], D[n, m-1], D[n-1, m-1])
return D
D = compute_accumulated_cost_matrix(C)
print('Accumulated cost matrix D =', D, sep='\n')
print('DTW distance DTW(X, Y) =', D[-1, -1])
Accumulated cost matrix D = [[1.46373697e+00 2.92747394e+00 4.39121091e+00 ... 2.57120403e+04 2.57135040e+04 2.57149678e+04] [3.06482381e+00 3.06482381e+00 4.52856078e+00 ... 2.47154425e+04 2.47170436e+04 2.47186447e+04] [5.35619179e+00 5.35619179e+00 5.35619179e+00 ... 2.47161328e+04 2.47177339e+04 2.47193350e+04] ... [3.19114298e+04 3.19114298e+04 3.19114298e+04 ... 2.11464065e+04 2.11477842e+04 2.11491620e+04] [3.19142234e+04 3.19142234e+04 3.19142234e+04 ... 2.11478224e+04 2.11492002e+04 2.11505779e+04] [3.19170842e+04 3.19170842e+04 3.19170842e+04 ... 2.11493054e+04 2.11506832e+04 2.11520609e+04]] DTW distance DTW(X, Y) = 21152.0608776131
@jit(nopython=True)
def compute_optimal_warping_path(D):
"""Compute the warping path given an accumulated cost matrix
Notebook: C3/C3S2_DTWbasic.ipynb
Args:
D (np.ndarray): Accumulated cost matrix
Returns:
P (np.ndarray): Optimal warping path
"""
N = D.shape[0]
M = D.shape[1]
n = N - 1
m = M - 1
P = [(n, m)]
while n > 0 or m > 0:
if n == 0:
cell = (0, m - 1)
elif m == 0:
cell = (n - 1, 0)
else:
val = min(D[n-1, m-1], D[n-1, m], D[n, m-1])
if val == D[n-1, m-1]:
cell = (n-1, m-1)
elif val == D[n-1, m]:
cell = (n-1, m)
else:
cell = (n, m-1)
P.append(cell)
(n, m) = cell
P.reverse()
return np.array(P)
P = compute_optimal_warping_path(D)
#print('Optimal warping path P =', P.tolist())
c_P = sum(C[n, m] for (n, m) in P)
print('Total cost of optimal warping path:', c_P)
print('DTW distance DTW(X, Y) =', D[-1, -1])
Total cost of optimal warping path: 21152.0608776131 DTW distance DTW(X, Y) = 21152.0608776131
P = np.array(P)
plt.figure(figsize=(18, 6))
plt.subplot(1, 2, 1)
plt.imshow(C, cmap='inferno', origin='lower', aspect='equal')
plt.plot(P[:, 1], P[:, 0], marker='o', color='r')
plt.clim([0, np.max(C)])
plt.colorbar()
plt.title('$C$ with optimal warping path')
plt.xlabel('Sequence Y')
plt.ylabel('Sequence X')
plt.subplot(1, 2, 2)
plt.imshow(D, cmap='inferno', origin='lower', aspect='equal')
plt.plot(P[:, 1], P[:, 0], marker='o', color='r')
plt.clim([0, np.max(D)])
plt.colorbar()
plt.title('$D$ with optimal warping path')
plt.xlabel('Sequence Y')
plt.ylabel('Sequence X')
plt.tight_layout()
X_aligned = np.zeros_like(X_stft)
Y_aligned = np.zeros_like(Y_stft)
X_aligned, Y_aligned = ensure_same_shape(X_aligned, Y_aligned)
X_stft, Y_stft = ensure_same_shape(X_stft, Y_stft)
curr_i1 = -1
curr_i2 = -1
for i, (i1, i2) in enumerate(P):
if(i1 > curr_i1):
X_aligned[:, i1] = X_stft[:, i1]
curr_i1 = i1
if(i2 > curr_i2):
Y_aligned[:, i1] = Y_stft[:, i2]
curr_i2 = i2
N = 4410
H = 2205
X_aligned = librosa.istft(X_aligned)
Y_aligned = librosa.istft(Y_aligned)
aligned_audio = np.column_stack((X_aligned, Y_aligned))
sf.write('aligned_audio.mp3', aligned_audio, samplerate=44100, format='mp3')
IPython.display.Audio("aligned_audio.mp3")